diff --git a/Cargo.lock b/Cargo.lock index e97b3b3d..f4d4b9a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -337,6 +337,17 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "829694371bd7bbc6aee17c4ff624aad8bf9f4dc06c6f9f6071eaa08c89530d10" +[[package]] +name = "fail" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be3c61c59fdc91f5dbc3ea31ee8623122ce80057058be560654c5d410d181a6" +dependencies = [ + "lazy_static", + "log", + "rand", +] + [[package]] name = "fastrand" version = "1.3.3" @@ -1133,6 +1144,7 @@ version = "0.1.0" dependencies = [ "backtrace", "bincode", + "fail", "itertools", "kernel", "log", diff --git a/src/representation/src/lib.rs b/src/representation/src/lib.rs index 76547552..c1603fcd 100644 --- a/src/representation/src/lib.rs +++ b/src/representation/src/lib.rs @@ -19,19 +19,6 @@ use sql_types::SqlType; use sqlparser::ast::Value; use std::convert::TryFrom; -// owned parallel of Datum but owns the content. -// pub enum Value { -// Null, -// True, -// False, -// Int32(i32), -// Int64(i64), -// Float32(f32), -// Float64(f64), -// String(String), -// // Bytes(Vec) -// } - /// value shared by the row. #[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)] pub enum Datum<'a> { @@ -119,7 +106,6 @@ impl<'a> Datum<'a> { Datum::SqlType(val) } - // @TODO: Add accessor helper functions. pub fn as_i16(&self) -> i16 { match self { Self::Int16(val) => *val, @@ -190,8 +176,6 @@ impl<'a> Datum<'a> { _ => panic!("invalid use of Datum::as_sql_type"), } } - - // arithmetic operations } #[derive(Debug, Clone)] @@ -425,13 +409,7 @@ pub fn unpack_raw(data: &[u8]) -> Vec { TypeTag::SqlType => { let val = unsafe { read::(data, &mut index) }; Datum::from_sql_type(val) - } // SqlType::Decimal | - // SqlType::Time | - // SqlType::TimeWithTimeZone | - // SqlType::Timestamp | - // SqlType::TimestampWithTimeZone | - // SqlType::Date | - // SqlType::Interval => unimplemented!() + } }; res.push(datum) } @@ -442,96 +420,57 @@ pub fn unpack_raw(data: &[u8]) -> Vec { mod tests { use super::*; - #[test] - fn row_packing_single() { - let datums = vec![Datum::from_bool(true)]; - let row = Binary::pack(&datums); - assert_eq!(row, Binary::with_data(vec![0x1])); - } - - #[test] - fn row_packing_multiple() { - let datums = vec![Datum::from_bool(true), Datum::from_i32(100000)]; - let row = Binary::pack(&datums); - assert_eq!(row, Binary::with_data(vec![0x1, 0x4, 0xa0, 0x86, 0x1, 0x0])); - } - - #[test] - fn row_packing_with_floats() { - let datums = vec![ - Datum::from_bool(false), - Datum::from_i32(100000), - Datum::from_f32(100.134_21), - ]; - let row = Binary::pack(&datums); - assert_eq!( - row, - Binary::with_data(vec![0x2, 0x4, 0xa0, 0x86, 0x1, 0x0, 0x7, 0xb7, 0x44, 0xc8, 0x42]) - ); - } - - #[test] - fn row_packing_with_null() { - let datums = vec![Datum::from_bool(true), Datum::from_null(), Datum::from_i32(100000)]; - let row = Binary::pack(&datums); - assert_eq!(row, Binary::with_data(vec![0x1, 0x0, 0x4, 0xa0, 0x86, 0x1, 0x0])); - } + #[cfg(test)] + mod pack_unpack_types { + use super::*; - #[test] - fn row_packing_string() { - let datums = vec![Datum::from_bool(true), Datum::from_str("hello")]; - let row = Binary::pack(&datums); - assert_eq!( - row, - Binary::with_data(vec![ - 0x1, 0x9, 0x5, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x65, 0x6c, 0x6c, 0x6f - ]) - ); - } + #[test] + fn null() { + let data = vec![Datum::from_null()]; + let row = Binary::pack(&data); + assert_eq!(data, row.unpack()); + } - #[test] - fn row_unpacking_single() { - let datums = vec![Datum::from_bool(true)]; - let row = Binary::pack(&datums); - assert_eq!(row.unpack(), datums); - } + #[test] + fn booleans() { + let data = vec![Datum::from_bool(true)]; + let row = Binary::pack(&data); + assert_eq!(data, row.unpack()); + } - #[test] - fn row_unpacking_multiple() { - let datums = vec![Datum::from_bool(true), Datum::from_i32(100000)]; - let row = Binary::pack(&datums); - assert_eq!(row.unpack(), datums); - } + #[test] + fn floats() { + let data = vec![Datum::from_f32(1000.123), Datum::from_f64(100.134_219_234_555)]; + let row = Binary::pack(&data); + assert_eq!(data, row.unpack()); + } - #[test] - fn row_unpacking_with_floats() { - let datums = vec![ - Datum::from_bool(false), - Datum::from_i32(100000), - Datum::from_f64(100.134_212_309_847), - ]; - let row = Binary::pack(&datums); - assert_eq!(row.unpack(), datums); - } + #[test] + fn integers() { + let data = vec![Datum::from_i16(100), Datum::from_i32(1_000), Datum::from_i64(10_000)]; + let row = Binary::pack(&data); + assert_eq!(data, row.unpack()); + } - #[test] - fn row_unpacking_with_null() { - let datums = vec![Datum::from_bool(true), Datum::from_null(), Datum::from_i32(100000)]; - let row = Binary::pack(&datums); - assert_eq!(row.unpack(), datums); - } + #[test] + fn unsigned_integers() { + let data = vec![Datum::from_u64(10_000)]; + let row = Binary::pack(&data); + assert_eq!(data, row.unpack()); + } - #[test] - fn row_unpacking_string() { - let datums = vec![Datum::from_bool(true), Datum::from_str("hello")]; - let row = Binary::pack(&datums); - assert_eq!(row.unpack(), datums); - } + #[test] + fn strings() { + let data = vec![Datum::from_string("string".to_owned()), Datum::from_str("hello")]; + let row = Binary::pack(&data); + assert_eq!(vec![Datum::from_str("string"), Datum::from_str("hello")], row.unpack()); + } - #[test] - fn row_unpacking_sql_type() { - let data = vec![Datum::from_sql_type(SqlType::VarChar(32))]; - let row = Binary::pack(&data); - assert_eq!(vec![Datum::from_sql_type(SqlType::VarChar(32))], row.unpack()); + #[test] + fn sql_type() { + let data = vec![Datum::from_sql_type(SqlType::VarChar(32))]; + let row = Binary::pack(&data); + assert_eq!(data, row.unpack()); + } } } diff --git a/src/sql_engine/src/catalog_manager/data_definition.rs b/src/sql_engine/src/catalog_manager/data_definition.rs index 67dd23b5..bafaa551 100644 --- a/src/sql_engine/src/catalog_manager/data_definition.rs +++ b/src/sql_engine/src/catalog_manager/data_definition.rs @@ -27,7 +27,7 @@ use std::{ Arc, RwLock, }, }; -use storage::{Database, InitStatus, PersistentDatabase, StorageError}; +use storage::{Database, InitStatus, PersistentDatabase}; const SYSTEM_CATALOG: &'_ str = "system"; // CREATE SCHEMA DEFINITION_SCHEMA @@ -548,12 +548,15 @@ impl DataDefinition { pub(crate) fn persistent(path: &PathBuf) -> SystemResult { let system_catalog = PersistentDatabase::new(path.join(SYSTEM_CATALOG)); let (catalogs, catalog_ids) = match system_catalog.init(DEFINITION_SCHEMA) { - Ok(InitStatus::Loaded) => { + Ok(Ok(InitStatus::Loaded)) => { let mut max_id = 0; let catalogs = system_catalog .read(DEFINITION_SCHEMA, CATALOG_NAMES_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have CATALOG_NAMES table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(id, name)| { let catalog_id = id.unpack()[0].as_u64(); max_id = max_id.max(catalog_id); @@ -563,27 +566,34 @@ impl DataDefinition { .collect::>(); (catalogs, max_id) } - Ok(InitStatus::Created) => { + Ok(Ok(InitStatus::Created)) => { system_catalog .create_object(DEFINITION_SCHEMA, CATALOG_NAMES_TABLE) + .expect("no io error") + .expect("no platform error") .expect("table CATALOG_NAMES is created"); system_catalog .create_object(DEFINITION_SCHEMA, SCHEMATA_TABLE) + .expect("no io error") + .expect("no platform error") .expect("table SCHEMATA is created"); system_catalog .create_object(DEFINITION_SCHEMA, TABLES_TABLE) + .expect("no io error") + .expect("no platform error") .expect("table TABLES is created"); system_catalog .create_object(DEFINITION_SCHEMA, COLUMNS_TABLE) + .expect("no io error") + .expect("no platform error") .expect("table COLUMNS is created"); (HashMap::new(), 0) } - Err(StorageError::RuntimeCheckError) => { + _ => { return Err(SystemError::runtime_check_failure( "No Path in SledDatabaseCatalog".to_owned(), )) } - Err(StorageError::SystemError(error)) => return Err(error), }; Ok(DataDefinition { catalog_ids: AtomicU64::new(catalog_ids), @@ -608,6 +618,8 @@ impl DataDefinition { Binary::pack(&[Datum::from_str(catalog_name)]), )], ) + .expect("no io error") + .expect("no platform error") .expect("to save catalog"); } } @@ -640,6 +652,8 @@ impl DataDefinition { CATALOG_NAMES_TABLE, vec![Binary::pack(&[Datum::from_u64(catalog.id())])], ) + .expect("no io error") + .expect("no platform error") .expect("to remove catalog"); } } @@ -662,11 +676,16 @@ impl DataDefinition { CATALOG_NAMES_TABLE, vec![Binary::pack(&[Datum::from_u64(catalog.id())])], ) + .expect("no io error") + .expect("no platform error") .expect("to remove catalog"); let schema_record_ids = system_catalog .read(DEFINITION_SCHEMA, SCHEMATA_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have SCHEMATA table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, _columns)| { let catalog_id = record_id.unpack()[0].as_u64(); (catalog_id, record_id) @@ -676,11 +695,16 @@ impl DataDefinition { .collect(); system_catalog .delete(DEFINITION_SCHEMA, SCHEMATA_TABLE, schema_record_ids) + .expect("no io error") + .expect("no platform error") .expect("to remove schemas under catalog"); let table_record_ids = system_catalog .read(DEFINITION_SCHEMA, TABLES_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have TABLES table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, _columns)| { let catalog_id = record_id.unpack()[0].as_u64(); (catalog_id, record_id) @@ -690,11 +714,16 @@ impl DataDefinition { .collect(); system_catalog .delete(DEFINITION_SCHEMA, TABLES_TABLE, table_record_ids) + .expect("no io error") + .expect("no platform error") .expect("to remove tables under catalog"); let table_column_record_ids = system_catalog .read(DEFINITION_SCHEMA, COLUMNS_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have COLUMNS table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, _data)| { let record = record_id.unpack(); let catalog = record[0].as_u64(); @@ -705,6 +734,8 @@ impl DataDefinition { .collect(); system_catalog .delete(DEFINITION_SCHEMA, COLUMNS_TABLE, table_column_record_ids) + .expect("no io error") + .expect("no platform error") .expect("to have remove tables columns under catalog"); } } @@ -732,6 +763,8 @@ impl DataDefinition { Binary::pack(&[Datum::from_str(catalog_name), Datum::from_str(schema_name)]), )], ) + .expect("no io error") + .expect("no platform error") .expect("to save schema"); } } @@ -747,8 +780,11 @@ impl DataDefinition { if let Some(system_catalog) = self.system_catalog.as_ref() { let schema_id = system_catalog .read(DEFINITION_SCHEMA, SCHEMATA_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have SCHEMATA_TABLE table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, columns)| { let _catalog_id = record_id.unpack()[0].as_u64(); let id = record_id.unpack()[1].as_u64(); @@ -809,6 +845,8 @@ impl DataDefinition { Datum::from_u64(schema_id), ])], ) + .expect("no io error") + .expect("no platform error") .expect("to remove schema"); } Ok(()) @@ -833,11 +871,16 @@ impl DataDefinition { Datum::from_u64(schema_id), ])], ) + .expect("no io error") + .expect("no platform error") .expect("to remove schema"); let table_record_ids = system_catalog .read(DEFINITION_SCHEMA, TABLES_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have TABLES table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, _columns)| { let ids = record_id.unpack(); let catalog_id = ids[0].as_u64(); @@ -851,11 +894,16 @@ impl DataDefinition { .collect(); system_catalog .delete(DEFINITION_SCHEMA, TABLES_TABLE, table_record_ids) + .expect("no io error") + .expect("no platform error") .expect("to remove tables under catalog"); let table_column_record_ids = system_catalog .read(DEFINITION_SCHEMA, COLUMNS_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have COLUMNS table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, _data)| { let record = record_id.unpack(); let catalog = record[0].as_u64(); @@ -869,6 +917,8 @@ impl DataDefinition { .collect(); system_catalog .delete(DEFINITION_SCHEMA, COLUMNS_TABLE, table_column_record_ids) + .expect("no io error") + .expect("no platform error") .expect("to have remove tables columns under catalog"); } Ok(()) @@ -884,8 +934,11 @@ impl DataDefinition { if let Some(system_catalog) = self.system_catalog.as_ref() { for (id, _catalog, schema) in system_catalog .read(DEFINITION_SCHEMA, SCHEMATA_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have SCHEMATA_TABLE table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, columns)| { let id = record_id.unpack()[1].as_u64(); let columns = columns.unpack(); @@ -915,8 +968,11 @@ impl DataDefinition { if let Some(system_catalog) = self.system_catalog.as_ref() { let schema_id = system_catalog .read(DEFINITION_SCHEMA, SCHEMATA_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have SCHEMATA_TABLE table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, columns)| { let id = record_id.unpack()[0].as_u64(); let name = columns.unpack()[1].as_str().to_owned(); @@ -939,8 +995,11 @@ impl DataDefinition { if let Some(system_catalog) = self.system_catalog.as_ref() { let table_info = system_catalog .read(DEFINITION_SCHEMA, TABLES_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have TABLES table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, data)| { let id = record_id.unpack()[0].as_u64(); let data = data.unpack(); @@ -956,8 +1015,11 @@ impl DataDefinition { let mut max_id = 0; let table_columns = system_catalog .read(DEFINITION_SCHEMA, COLUMNS_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have COLUMNS table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, data)| { let id = record_id.unpack()[3].as_u64(); let data = data.unpack(); @@ -1025,6 +1087,8 @@ impl DataDefinition { ]), )], ) + .expect("no io error") + .expect("no platform error") .expect("to save table info"); for (id, column) in created_table.columns() { system_catalog @@ -1048,6 +1112,8 @@ impl DataDefinition { ]), )], ) + .expect("no io error") + .expect("no platform error") .expect("to save column"); } } @@ -1075,6 +1141,8 @@ impl DataDefinition { Datum::from_u64(table_id), ])], ) + .expect("no io error") + .expect("no platform error") .expect("to remove table"); } } @@ -1092,8 +1160,11 @@ impl DataDefinition { if let Some(system_catalog) = self.system_catalog.as_ref() { for (table_id, _catalog, _schema, table) in system_catalog .read(DEFINITION_SCHEMA, TABLES_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have SCHEMATA_TABLE table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, columns)| { let id = record_id.unpack()[1].as_u64(); let columns = columns.unpack(); @@ -1107,8 +1178,11 @@ impl DataDefinition { let mut max_id = 0; let table_columns = system_catalog .read(DEFINITION_SCHEMA, COLUMNS_TABLE) + .expect("no io error") + .expect("no platform error") .expect("to have COLUMNS table") .map(Result::unwrap) + .map(Result::unwrap) .map(|(record_id, data)| { let id = record_id.unpack()[3].as_u64(); let data = data.unpack(); diff --git a/src/sql_engine/src/catalog_manager/mod.rs b/src/sql_engine/src/catalog_manager/mod.rs index ad6a23aa..cf8a9f93 100644 --- a/src/sql_engine/src/catalog_manager/mod.rs +++ b/src/sql_engine/src/catalog_manager/mod.rs @@ -18,7 +18,7 @@ use std::{ path::PathBuf, sync::atomic::{AtomicU64, Ordering}, }; -use storage::{Database, InMemoryDatabase, InitStatus, PersistentDatabase, ReadCursor, Row, StorageError}; +use storage::{Database, InMemoryDatabase, InitStatus, PersistentDatabase, ReadCursor, Row}; mod data_definition; @@ -69,25 +69,26 @@ impl CatalogManager { Some(_id) => { for schema in data_definition.schemas(DEFAULT_CATALOG) { match catalog.init(schema.as_str()) { - Ok(InitStatus::Loaded) => { + Ok(Ok(InitStatus::Loaded)) => { for table in data_definition.tables(DEFAULT_CATALOG, schema.as_str()) { catalog.open_object(schema.as_str(), table.as_str()); } } - Ok(InitStatus::Created) => { + Ok(Ok(InitStatus::Created)) => { log::error!("Schema {:?} should have been already created", schema); return Err(SystemError::bug_in_sql_engine( Operation::Access, Object::Schema(schema.as_str()), )); } - Err(error) => { + Ok(Err(error)) => { log::error!("Error during schema {:?} initialization {:?}", schema, error); return Err(SystemError::bug_in_sql_engine( Operation::Access, Object::Schema(schema.as_str()), )); } + Err(io_error) => return Err(SystemError::io(io_error)), } } } @@ -123,9 +124,8 @@ impl CatalogManager { pub fn create_schema(&self, schema_name: &str) -> SystemResult<()> { self.data_definition.create_schema(DEFAULT_CATALOG, schema_name); match self.data_storage.create_schema(schema_name) { - Ok(()) => Ok(()), - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + Ok(Ok(Ok(()))) => Ok(()), + _ => Err(SystemError::bug_in_sql_engine( Operation::Create, Object::Schema(schema_name), )), @@ -135,9 +135,8 @@ impl CatalogManager { pub fn drop_schema(&self, schema_name: &str, strategy: DropStrategy) -> SystemResult> { match self.data_definition.drop_schema(DEFAULT_CATALOG, schema_name, strategy) { Ok(()) => match self.data_storage.drop_schema(schema_name) { - Ok(()) => Ok(Ok(())), - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + Ok(Ok(Ok(()))) => Ok(Ok(())), + _ => Err(SystemError::bug_in_sql_engine( Operation::Drop, Object::Schema(schema_name), )), @@ -155,9 +154,8 @@ impl CatalogManager { self.data_definition .create_table(DEFAULT_CATALOG, schema_name, table_name, column_definitions); match self.data_storage.create_object(schema_name, table_name) { - Ok(()) => Ok(()), - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + Ok(Ok(Ok(()))) => Ok(()), + _ => Err(SystemError::bug_in_sql_engine( Operation::Create, Object::Table(schema_name, table_name), )), @@ -174,9 +172,8 @@ impl CatalogManager { self.data_definition .drop_table(DEFAULT_CATALOG, schema_name, table_name); match self.data_storage.drop_object(schema_name, table_name) { - Ok(()) => Ok(()), - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + Ok(Ok(Ok(()))) => Ok(()), + _ => Err(SystemError::bug_in_sql_engine( Operation::Drop, Object::Table(schema_name, table_name), )), @@ -186,9 +183,8 @@ impl CatalogManager { pub fn insert_into(&self, schema_name: &str, table_name: &str, values: Vec) -> SystemResult { log::debug!("{:#?}", values); match self.data_storage.write(schema_name, table_name, values) { - Ok(size) => Ok(size), - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + Ok(Ok(Ok(size))) => Ok(size), + _ => Err(SystemError::bug_in_sql_engine( Operation::Access, Object::Table(schema_name, table_name), )), @@ -197,9 +193,8 @@ impl CatalogManager { pub fn table_scan(&self, schema_name: &str, table_name: &str) -> SystemResult { match self.data_storage.read(schema_name, table_name) { - Ok(read) => Ok(read), - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + Ok(Ok(Ok(read))) => Ok(read), + _ => Err(SystemError::bug_in_sql_engine( Operation::Access, Object::Table(schema_name, table_name), )), @@ -208,9 +203,8 @@ impl CatalogManager { pub fn update_all(&self, schema_name: &str, table_name: &str, rows: Vec) -> SystemResult { match self.data_storage.write(schema_name, table_name, rows) { - Ok(size) => Ok(size), - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + Ok(Ok(Ok(size))) => Ok(size), + _ => Err(SystemError::bug_in_sql_engine( Operation::Access, Object::Table(schema_name, table_name), )), @@ -219,17 +213,20 @@ impl CatalogManager { pub fn delete_all_from(&self, schema_name: &str, table_name: &str) -> SystemResult { match self.data_storage.read(schema_name, table_name) { - Ok(reads) => { - let keys = reads.map(Result::unwrap).map(|(key, _)| key).collect(); + Ok(Ok(Ok(reads))) => { + let keys = reads + .map(Result::unwrap) + .map(Result::unwrap) + .map(|(key, _)| key) + .collect(); match self.data_storage.delete(schema_name, table_name, keys) { - Ok(len) => Ok(len), + Ok(Ok(Ok(len))) => Ok(len), _ => unreachable!( "all errors that make code fall in here should have been handled in read operation" ), } } - Err(StorageError::SystemError(error)) => Err(error), - Err(StorageError::RuntimeCheckError) => Err(SystemError::bug_in_sql_engine( + _ => Err(SystemError::bug_in_sql_engine( Operation::Access, Object::Table(schema_name, table_name), )), diff --git a/src/sql_engine/src/catalog_manager/tests/queries/delete.rs b/src/sql_engine/src/catalog_manager/tests/queries/delete.rs index f4afa8db..fd7f9076 100644 --- a/src/sql_engine/src/catalog_manager/tests/queries/delete.rs +++ b/src/sql_engine/src/catalog_manager/tests/queries/delete.rs @@ -54,7 +54,7 @@ fn delete_all_from_table(default_schema_name: &str, storage_with_schema: Catalog assert_eq!( storage_with_schema .table_scan("schema_name", "table_name") - .map(Iterator::collect), + .map(|iter| iter.map(Result::unwrap).map(Result::unwrap).collect()), Ok(vec![]) ); } diff --git a/src/sql_engine/src/catalog_manager/tests/queries/select.rs b/src/sql_engine/src/catalog_manager/tests/queries/select.rs index 7bc35c83..051ad997 100644 --- a/src/sql_engine/src/catalog_manager/tests/queries/select.rs +++ b/src/sql_engine/src/catalog_manager/tests/queries/select.rs @@ -43,7 +43,11 @@ fn select_all_from_table_with_many_columns(default_schema_name: &str, with_small assert_eq!( with_small_ints_table .table_scan(default_schema_name, "table_name") - .map(|read| read.map(Result::unwrap).map(|(_key, values)| values).collect()), + .map(|read| read + .map(Result::unwrap) + .map(Result::unwrap) + .map(|(_key, values)| values) + .collect()), Ok(vec![Binary::with_data(b"1|2|3".to_vec())]) ); } diff --git a/src/sql_engine/src/dml/select.rs b/src/sql_engine/src/dml/select.rs index 150fd02c..c05e0b19 100644 --- a/src/sql_engine/src/dml/select.rs +++ b/src/sql_engine/src/dml/select.rs @@ -251,6 +251,7 @@ impl<'sc> SelectCommand<'sc> { } let values: Vec> = records + .map(Result::unwrap) .map(Result::unwrap) .map(|(_key, values)| { let row: Vec = values.unpack().into_iter().map(|datum| datum.to_string()).collect(); diff --git a/src/sql_engine/src/dml/update.rs b/src/sql_engine/src/dml/update.rs index 9defedef..de240698 100644 --- a/src/sql_engine/src/dml/update.rs +++ b/src/sql_engine/src/dml/update.rs @@ -183,6 +183,7 @@ impl UpdateCommand { let to_update: Vec = match self.storage.table_scan(&schema_name, &table_name) { Err(error) => return Err(error), Ok(reads) => reads + .map(Result::unwrap) .map(Result::unwrap) .map(|(key, values)| { let mut datums = unpack_raw(values.to_bytes()); diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index e20c20af..840daab2 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -14,8 +14,49 @@ serde = { version = "1.0.115", features = ["derive"] } bincode = "1.3.1" representation = { path = "../representation" } itertools = "0.9.0" +fail = { version = "0.4.0", features = ["failpoints"] } [dev-dependencies] backtrace = "0.3.49" rstest = "0.6.4" tempfile = "3.1.0" + +[[test]] +name = "sled_fails_to_open_db" +path = "tests/failpoints/open_db.rs" +required-features = ["fail/failpoints"] + +[[test]] +name = "sled_fails_to_open_tree" +path = "tests/failpoints/open_tree.rs" +required-features = ["fail/failpoints"] + +[[test]] +name = "sled_fails_to_drop_db" +path = "tests/failpoints/drop_db.rs" +required-features = ["fail/failpoints"] + +[[test]] +name = "sled_fails_to_drop_tree" +path = "tests/failpoints/drop_tree.rs" +required-features = ["fail/failpoints"] + +[[test]] +name = "sled_fails_to_insert_into_tree" +path = "tests/failpoints/insert_into_tree.rs" +required-features = ["fail/failpoints"] + +[[test]] +name = "sled_fails_to_flush_tree" +path = "tests/failpoints/flush_tree.rs" +required-features = ["fail/failpoints"] + +[[test]] +name = "sled_fails_to_iterate_over_tree" +path = "tests/failpoints/iterate_over_tree.rs" +required-features = ["fail/failpoints"] + +[[test]] +name = "sled_fails_to_remove_from_tree" +path = "tests/failpoints/remove_from_tree.rs" +required-features = ["fail/failpoints"] diff --git a/src/storage/src/in_memory.rs b/src/storage/src/in_memory.rs index 5fa87a87..fe47a31e 100644 --- a/src/storage/src/in_memory.rs +++ b/src/storage/src/in_memory.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Database, Key, ReadCursor, Row, StorageError, StorageResult, Values}; -use kernel::SystemResult; +use crate::{Database, DefinitionError, Key, ReadCursor, RowResult, StorageError, Values}; use std::{ collections::{BTreeMap, HashMap}, + io::{self}, sync::RwLock, }; @@ -37,31 +37,35 @@ pub struct InMemoryDatabase { } impl Database for InMemoryDatabase { - fn create_schema(&self, schema_name: &str) -> StorageResult<()> { + fn create_schema(&self, schema_name: &str) -> io::Result, StorageError>> { if self .schemas .read() .expect("to acquire read lock") .contains_key(schema_name) { - Err(StorageError::RuntimeCheckError) + Ok(Ok(Err(DefinitionError::SchemaAlreadyExists))) } else { self.schemas .write() .expect("to acquire write lock") .insert(schema_name.to_owned(), Schema::default()); - Ok(()) + Ok(Ok(Ok(()))) } } - fn drop_schema(&self, schema_name: &str) -> StorageResult<()> { + fn drop_schema(&self, schema_name: &str) -> io::Result, StorageError>> { match self.schemas.write().expect("to acquire write lock").remove(schema_name) { - Some(_namespace) => Ok(()), - None => Err(StorageError::RuntimeCheckError), + Some(_namespace) => Ok(Ok(Ok(()))), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn create_object(&self, schema_name: &str, object_name: &str) -> StorageResult<()> { + fn create_object( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>> { match self .schemas .write() @@ -70,17 +74,21 @@ impl Database for InMemoryDatabase { { Some(schema) => { if schema.objects.contains_key(object_name) { - Err(StorageError::RuntimeCheckError) + Ok(Ok(Err(DefinitionError::ObjectAlreadyExists))) } else { schema.objects.insert(object_name.to_owned(), StorageObject::default()); - Ok(()) + Ok(Ok(Ok(()))) } } - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn drop_object(&self, schema_name: &str, object_name: &str) -> StorageResult<()> { + fn drop_object( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>> { match self .schemas .write() @@ -88,14 +96,19 @@ impl Database for InMemoryDatabase { .get_mut(schema_name) { Some(schema) => match schema.objects.remove(object_name) { - Some(_) => Ok(()), - None => Err(StorageError::RuntimeCheckError), + Some(_) => Ok(Ok(Ok(()))), + None => Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))), }, - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn write(&self, schema_name: &str, object_name: &str, rows: Vec<(Key, Values)>) -> StorageResult { + fn write( + &self, + schema_name: &str, + object_name: &str, + rows: Vec<(Key, Values)>, + ) -> io::Result, StorageError>> { match self .schemas .write() @@ -108,33 +121,42 @@ impl Database for InMemoryDatabase { for (key, value) in rows { object.records.insert(key, value); } - Ok(len) + Ok(Ok(Ok(len))) } - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))), }, - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn read(&self, schema_name: &str, object_name: &str) -> StorageResult { + fn read( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>> { match self.schemas.read().expect("to acquire read lock").get(schema_name) { Some(schema) => match schema.objects.get(object_name) { - Some(object) => Ok(Box::new( + Some(object) => Ok(Ok(Ok(Box::new( object .records .clone() .into_iter() - .map(Ok) - .collect::>>() + .map(|value| Ok(Ok(value))) + .collect::>() .into_iter(), - )), - None => Err(StorageError::RuntimeCheckError), + )))), + None => Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))), }, - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn delete(&self, schema_name: &str, object_name: &str, keys: Vec) -> StorageResult { + fn delete( + &self, + schema_name: &str, + object_name: &str, + keys: Vec, + ) -> io::Result, StorageError>> { match self .schemas .write() @@ -149,11 +171,11 @@ impl Database for InMemoryDatabase { .into_iter() .filter(|(key, _values)| !keys.contains(key)) .collect(); - Ok(keys.len()) + Ok(Ok(Ok(keys.len()))) } - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))), }, - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 16c49c3b..8c748c0a 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -12,17 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate kernel; -extern crate log; - -use kernel::{SystemError, SystemResult}; use representation::Binary; +use std::io::{self}; pub type Row = (Key, Values); pub type Key = Binary; pub type Values = Binary; -pub type ReadCursor = Box>>; -pub type StorageResult = std::result::Result; +pub type RowResult = io::Result>; +pub type ReadCursor = Box>; mod in_memory; mod persistent; @@ -36,24 +33,55 @@ pub enum InitStatus { #[derive(Debug, PartialEq)] pub enum StorageError { - RuntimeCheckError, - SystemError(SystemError), + Io, + CascadeIo(Vec), + Storage, +} + +#[derive(Debug, PartialEq)] +pub enum DefinitionError { + SchemaAlreadyExists, + SchemaDoesNotExist, + ObjectAlreadyExists, + ObjectDoesNotExist, } pub trait Database { - fn create_schema(&self, schema_name: &str) -> StorageResult<()>; + fn create_schema(&self, schema_name: &str) -> io::Result, StorageError>>; - fn drop_schema(&self, schema_name: &str) -> StorageResult<()>; + fn drop_schema(&self, schema_name: &str) -> io::Result, StorageError>>; - fn create_object(&self, schema_name: &str, object_name: &str) -> StorageResult<()>; + fn create_object( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>>; - fn drop_object(&self, schema_name: &str, object_name: &str) -> StorageResult<()>; + fn drop_object( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>>; - fn write(&self, schema_name: &str, object_name: &str, values: Vec) -> StorageResult; + fn write( + &self, + schema_name: &str, + object_name: &str, + values: Vec, + ) -> io::Result, StorageError>>; - fn read(&self, schema_name: &str, object_name: &str) -> StorageResult; + fn read( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>>; - fn delete(&self, schema_name: &str, object_name: &str, keys: Vec) -> StorageResult; + fn delete( + &self, + schema_name: &str, + object_name: &str, + keys: Vec, + ) -> io::Result, StorageError>>; } #[cfg(test)] diff --git a/src/storage/src/persistent.rs b/src/storage/src/persistent.rs index 1df58a78..d3a66f72 100644 --- a/src/storage/src/persistent.rs +++ b/src/storage/src/persistent.rs @@ -12,43 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Database, InitStatus, Key, ReadCursor, Row, StorageError, StorageResult}; -use kernel::SystemError; +use crate::{Database, DefinitionError, InitStatus, Key, ReadCursor, Row, RowResult, StorageError}; use representation::Binary; -use sled::{Db as Schema, Error as SledError}; +use sled::{Db as Schema, DiskPtr, Error as SledError, IVec, Tree}; use std::{ collections::HashMap, + io::{self, ErrorKind}, path::PathBuf, sync::{Arc, RwLock}, }; -pub struct SledErrorMapper; - -impl SledErrorMapper { - fn map(error: SledError) -> SystemError { - match error { - SledError::CollectionNotFound(system_file) => SystemError::unrecoverable(format!( - "System file [{}] can't be found", - String::from_utf8(system_file.to_vec()).expect("name of system file") - )), - SledError::Unsupported(operation) => { - SystemError::unrecoverable(format!("Unsupported operation [{}] was used on Sled", operation)) - } - SledError::Corruption { at, bt: _bt } => { - if let Some(at) = at { - SystemError::unrecoverable(format!("Sled encountered corruption at {}", at)) - } else { - SystemError::unrecoverable("Sled encountered corruption".to_owned()) - } - } - SledError::ReportableBug(description) => { - SystemError::unrecoverable(format!("Sled encountered reportable BUG: {}", description)) - } - SledError::Io(error) => SystemError::io(error), - } - } -} - pub struct PersistentDatabase { path: PathBuf, schemas: RwLock>>, @@ -62,10 +35,10 @@ impl PersistentDatabase { } } - pub fn init(&self, schema_name: &str) -> StorageResult { + pub fn init(&self, schema_name: &str) -> io::Result> { let path_to_schema = PathBuf::from(&self.path).join(schema_name); log::info!("path to schema {:?}", path_to_schema); - match sled::open(path_to_schema) { + match self.open_database(path_to_schema) { Ok(schema) => { let recovered = schema.was_recovered(); self.schemas @@ -74,146 +47,284 @@ impl PersistentDatabase { .insert(schema_name.to_owned(), Arc::new(schema)); log::debug!("namespaces after initialization {:?}", self.schemas); if recovered { - Ok(InitStatus::Loaded) + Ok(Ok(InitStatus::Loaded)) } else { - Ok(InitStatus::Created) + Ok(Ok(InitStatus::Created)) } } - Err(error) => Err(StorageError::SystemError(SledErrorMapper::map(error))), + Err(error) => match error { + SledError::Io(io_error) => Err(io_error), + SledError::Corruption { .. } => Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => Ok(Err(StorageError::Storage)), + }, } } pub fn open_object(&self, schema_name: &str, object_name: &str) { if let Some(schema) = self.schemas.read().expect("to acquire write lock").get(schema_name) { - schema - .open_tree(object_name) - .expect("to open tree") - .flush() - .expect("to flush"); + self.open_tree(schema.clone(), object_name) + .expect("no io error") + .expect("no platform error") + .expect("no definition error"); } } - fn new_schema(&self, schema_name: &str) -> StorageResult> { + fn open_database(&self, path_to_schema: PathBuf) -> Result { + fail::fail_point!("sled-fail-to-open-db", |kind| Err(sled_error(kind))); + sled::open(path_to_schema) + } + + fn open_tree( + &self, + schema: Arc, + object_name: &str, + ) -> io::Result, StorageError>> { + match self.open_tree_with_failpoint(schema, object_name) { + Ok(tree) => Ok(Ok(Ok(tree))), + Err(error) => match error { + SledError::Io(io_error) => Err(io_error), + SledError::Corruption { .. } => Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))), + }, + } + } + + fn open_tree_with_failpoint(&self, schema: Arc, object_name: &str) -> Result { + fail::fail_point!("sled-fail-to-open-tree", |kind| Err(sled_error(kind))); + schema.open_tree(object_name) + } + + fn drop_database(&self, schema: Arc) -> io::Result, StorageError>> { + let mut io_errors = vec![]; + for tree_name in schema.tree_names() { + let name = tree_name.clone(); + match self.drop_database_cascade_with_failpoint(schema.clone(), tree_name) { + Ok(true) => log::info!("{:?} was dropped", name), + Ok(false) => log::info!("{:?} was not dropped", name), + Err(SledError::Io(_)) => io_errors.push(String::from_utf8_lossy(&name).into()), + Err(SledError::Corruption { .. }) => return Ok(Err(StorageError::Storage)), + Err(SledError::CollectionNotFound(_)) => return Ok(Err(StorageError::Storage)), + Err(SledError::Unsupported(message)) => { + if message != "cannot remove the core structures" { + return Ok(Err(StorageError::Storage)); + } + } + Err(SledError::ReportableBug(_)) => return Ok(Err(StorageError::Storage)), + } + } + if io_errors.is_empty() { + Ok(Ok(Ok(()))) + } else { + Ok(Err(StorageError::CascadeIo(io_errors))) + } + } + + fn drop_database_cascade_with_failpoint(&self, schema: Arc, tree: IVec) -> Result { + fail::fail_point!("sled-fail-to-drop-db", |kind| { + if tree == b"__sled__default" { + Err(SledError::Unsupported("cannot remove the core structures".into())) + } else { + Err(sled_error(kind)) + } + }); + schema.drop_tree(tree) + } + + fn drop_tree_with_failpoint(&self, schema: Arc, tree: IVec) -> Result { + fail::fail_point!("sled-fail-to-drop-tree", |kind| Err(sled_error(kind))); + schema.drop_tree(tree) + } + + fn insert_into_tree_with_failpoint( + &self, + tree: &Tree, + key: &Binary, + values: &Binary, + ) -> Result, SledError> { + fail::fail_point!("sled-fail-to-insert-into-tree", |kind| Err(sled_error(kind))); + tree.insert(key.to_bytes(), values.to_bytes()) + } + + fn tree_flush_with_failpoint(&self, tree: Tree) -> Result { + fail::fail_point!("sled-fail-to-flush-tree", |kind| Err(sled_error(kind))); + tree.flush() + } + + fn iterator_over_tree_with_failpoint(&self, object: Tree) -> Box>> { + fail::fail_point!("sled-fail-iterate-over-tree", |kind| Box::new( + vec![Err(sled_error(kind))].into_iter() + )); + Box::new(object.iter()) + } + fn remove_fro_tree_with_failpoint(&self, object: &Tree, key: Binary) -> Result, SledError> { + fail::fail_point!("sled-fail-to-remove-from-tree", |kind| Err(sled_error(kind))); + object.remove(key.to_bytes()) + } + + fn empty_iterator(&self) -> Box> { + Box::new(std::iter::empty()) + } +} + +impl Database for PersistentDatabase { + fn create_schema(&self, schema_name: &str) -> io::Result, StorageError>> { if self .schemas .read() .expect("to acquire read lock") .contains_key(schema_name) { - Err(StorageError::RuntimeCheckError) + Ok(Ok(Err(DefinitionError::SchemaAlreadyExists))) } else { let path_to_schema = PathBuf::from(&self.path).join(schema_name); log::info!("path to schema {:?}", path_to_schema); - match sled::open(path_to_schema) { + match self.open_database(path_to_schema) { Ok(schema) => { - let schema = Arc::new(schema); self.schemas .write() .expect("to acquire write lock") - .insert(schema_name.to_owned(), schema.clone()); - Ok(schema) + .insert(schema_name.to_owned(), Arc::new(schema)); + Ok(Ok(Ok(()))) } - Err(error) => Err(StorageError::SystemError(SledErrorMapper::map(error))), + Err(error) => match error { + SledError::Io(io_error) => Err(io_error), + SledError::Corruption { .. } => Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => Ok(Err(StorageError::Storage)), + }, } } } -} -impl Database for PersistentDatabase { - fn create_schema(&self, schema_name: &str) -> StorageResult<()> { - self.new_schema(schema_name).map(|_| ()) - } - - fn drop_schema(&self, schema_name: &str) -> StorageResult<()> { + fn drop_schema(&self, schema_name: &str) -> io::Result, StorageError>> { match self.schemas.write().expect("to acquire write lock").remove(schema_name) { - Some(schema) => { - for tree in schema.tree_names() { - let name = tree.clone(); - match schema.drop_tree(tree) { - Ok(true) => log::info!("{:?} was dropped", name), - Ok(false) => log::info!("{:?} was not dropped", name), - Err(error) => log::error!("{:?} was not dropped due to {:?}", name, error), - } - } - drop(schema); - Ok(()) - } - None => Err(StorageError::RuntimeCheckError), + Some(schema) => self.drop_database(schema), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn create_object(&self, schema_name: &str, object_name: &str) -> StorageResult<()> { + fn create_object( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>> { match self.schemas.read().expect("to acquire read lock").get(schema_name) { Some(schema) => { if schema.tree_names().contains(&(object_name.into())) { - Err(StorageError::RuntimeCheckError) + Ok(Ok(Err(DefinitionError::ObjectAlreadyExists))) } else { - match schema.open_tree(object_name) { - Ok(object) => { - log::debug!("tree {:?}.{:?} was created as {:?}", schema_name, object_name, object); - object.flush().expect("Ok"); - Ok(()) - } - Err(error) => Err(StorageError::SystemError(SledErrorMapper::map(error))), - } + self.open_tree(schema.clone(), object_name) + .map(|io| io.map(|storage| storage.map(|_object| ()))) } } - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn drop_object(&self, schema_name: &str, object_name: &str) -> StorageResult<()> { + fn drop_object( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>> { match self.schemas.read().expect("to acquire read lock").get(schema_name) { - Some(schema) => match schema.drop_tree(object_name.as_bytes()) { - Ok(true) => Ok(()), - Ok(false) => Err(StorageError::RuntimeCheckError), - Err(error) => Err(StorageError::SystemError(SledErrorMapper::map(error))), + Some(schema) => match self.drop_tree_with_failpoint(schema.clone(), object_name.as_bytes().into()) { + Ok(true) => Ok(Ok(Ok(()))), + Ok(false) => Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))), + Err(error) => match error { + SledError::Io(io_error) => Err(io_error), + SledError::Corruption { .. } => Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))), + }, }, - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn write(&self, schema_name: &str, object_name: &str, rows: Vec) -> StorageResult { + fn write( + &self, + schema_name: &str, + object_name: &str, + rows: Vec, + ) -> io::Result, StorageError>> { match self.schemas.read().expect("to acquire read lock").get(schema_name) { Some(schema) => { if schema.tree_names().contains(&(object_name.into())) { - match schema.open_tree(object_name) { - Ok(object) => { + match self.open_tree(schema.clone(), object_name) { + Ok(Ok(Ok(object))) => { let mut written_rows = 0; for (key, values) in rows.iter() { - match object - .insert::(key.to_bytes().into(), values.to_bytes().into()) - { + match self.insert_into_tree_with_failpoint(&object, key, values) { Ok(_) => written_rows += 1, - Err(error) => return Err(StorageError::SystemError(SledErrorMapper::map(error))), + Err(error) => match error { + SledError::Io(io_error) => return Err(io_error), + SledError::Corruption { .. } => return Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => return Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => return Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => { + return Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))) + } + }, + } + } + match self.tree_flush_with_failpoint(object) { + Ok(flushed) => { + log::trace!("{:?} data is written to {:?}.{:?}", rows, schema_name, object_name); + log::debug!("| inserted {:?} | flushed {:?} |", written_rows, flushed); + Ok(Ok(Ok(written_rows))) } + Err(error) => match error { + SledError::Io(io_error) => Err(io_error), + SledError::Corruption { .. } => Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => { + Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))) + } + }, } - object.flush().expect("Ok"); - log::info!("{:?} data is written to {:?}.{:?}", rows, schema_name, object_name); - Ok(written_rows) } - Err(error) => Err(StorageError::SystemError(SledErrorMapper::map(error))), + otherwise => otherwise.map(|io| io.map(|storage| storage.map(|_object| 0))), } } else { - Err(StorageError::RuntimeCheckError) + Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))) } } - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } - fn read(&self, schema_name: &str, object_name: &str) -> StorageResult { + fn read( + &self, + schema_name: &str, + object_name: &str, + ) -> io::Result, StorageError>> { match self.schemas.read().expect("to acquire read lock").get(schema_name) { Some(schema) => { if schema.tree_names().contains(&(object_name.into())) { - match schema.open_tree(object_name) { - Ok(object) => Ok(Box::new(object.iter().map(|item| match item { - Ok((key, values)) => { - Ok((Binary::with_data(key.to_vec()), Binary::with_data(values.to_vec()))) - } - Err(error) => Err(SledErrorMapper::map(error)), - }))), - Err(error) => Err(StorageError::SystemError(SledErrorMapper::map(error))), + match self.open_tree(schema.clone(), object_name) { + Ok(Ok(Ok(object))) => Ok(Ok(Ok(Box::new(self.iterator_over_tree_with_failpoint(object).map( + |item| match item { + Ok((key, values)) => Ok(Ok(( + Binary::with_data(key.to_vec()), + Binary::with_data(values.to_vec()), + ))), + Err(error) => match error { + SledError::Io(io_error) => Err(io_error), + SledError::Corruption { .. } => Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => Ok(Err(StorageError::Storage)), + }, + }, + ))))), + otherwise => otherwise.map(|io| io.map(|storage| storage.map(|_object| self.empty_iterator()))), } } else { log::error!( @@ -221,96 +332,80 @@ impl Database for PersistentDatabase { schema_name, object_name ); - Err(StorageError::RuntimeCheckError) + Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))) } } None => { - log::error!("No namespace with {:?} name found", schema_name); - Err(StorageError::RuntimeCheckError) + log::error!("No schema with {:?} name found", schema_name); + Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))) } } } - fn delete(&self, schema_name: &str, object_name: &str, keys: Vec) -> StorageResult { + fn delete( + &self, + schema_name: &str, + object_name: &str, + keys: Vec, + ) -> io::Result, StorageError>> { match self.schemas.read().expect("to acquire read lock").get(schema_name) { Some(schema) => { if schema.tree_names().contains(&(object_name.into())) { - let mut deleted = 0; - match schema.open_tree(object_name) { - Ok(object) => { + match self.open_tree(schema.clone(), object_name) { + Ok(Ok(Ok(object))) => { + let mut deleted = 0; for key in keys { - match object.remove(key.to_bytes()) { + match self.remove_fro_tree_with_failpoint(&object, key) { Ok(_) => deleted += 1, - Err(error) => return Err(StorageError::SystemError(SledErrorMapper::map(error))), + Err(error) => match error { + SledError::Io(io_error) => return Err(io_error), + SledError::Corruption { .. } => return Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => return Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => return Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => { + return Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))) + } + }, } } - object.flush().expect("Ok"); + match self.tree_flush_with_failpoint(object) { + Ok(flushed) => { + log::debug!("| inserted {:?} | flushed {:?} |", deleted, flushed); + Ok(Ok(Ok(deleted))) + } + Err(error) => match error { + SledError::Io(io_error) => Err(io_error), + SledError::Corruption { .. } => Ok(Err(StorageError::Storage)), + SledError::ReportableBug(_) => Ok(Err(StorageError::Storage)), + SledError::Unsupported(_) => Ok(Err(StorageError::Storage)), + SledError::CollectionNotFound(_) => { + Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))) + } + }, + } } - Err(error) => return Err(StorageError::SystemError(SledErrorMapper::map(error))), + otherwise => otherwise.map(|io| io.map(|storage| storage.map(|_object| 0))), } - Ok(deleted) } else { - Err(StorageError::RuntimeCheckError) + Ok(Ok(Err(DefinitionError::ObjectDoesNotExist))) } } - None => Err(StorageError::RuntimeCheckError), + None => Ok(Ok(Err(DefinitionError::SchemaDoesNotExist))), } } } -#[cfg(test)] -mod sled_error_mapper { - use super::*; - use sled::DiskPtr; - use std::io::{Error, ErrorKind}; - - #[test] - fn collection_not_found() { - assert_eq!( - SledErrorMapper::map(SledError::CollectionNotFound(sled::IVec::from("test"))), - SystemError::unrecoverable("System file [test] can't be found".to_owned()) - ) - } - - #[test] - fn unsupported() { - assert_eq!( - SledErrorMapper::map(SledError::Unsupported("NOT_SUPPORTED".to_owned())), - SystemError::unrecoverable("Unsupported operation [NOT_SUPPORTED] was used on Sled".to_owned()) - ) - } - - #[test] - fn corruption_with_position() { - let at = DiskPtr::Inline(900); - assert_eq!( - SledErrorMapper::map(SledError::Corruption { at: Some(at), bt: () }), - SystemError::unrecoverable(format!("Sled encountered corruption at {}", at)) - ) - } - - #[test] - fn corruption_without_position() { - assert_eq!( - SledErrorMapper::map(SledError::Corruption { at: None, bt: () }), - SystemError::unrecoverable("Sled encountered corruption".to_owned()) - ) - } - - #[test] - fn reportable_bug() { - let description = "SOME_BUG_HERE"; - assert_eq!( - SledErrorMapper::map(SledError::ReportableBug(description.to_owned())), - SystemError::unrecoverable(format!("Sled encountered reportable BUG: {}", description)) - ); - } - - #[test] - fn io() { - assert_eq!( - SledErrorMapper::map(SledError::Io(Error::new(ErrorKind::Other, "oh no!"))), - SystemError::io(Error::new(ErrorKind::Other, "oh no!")) - ) +fn sled_error(kind: Option) -> SledError { + match kind.as_deref() { + Some("io") => SledError::Io(ErrorKind::Other.into()), + Some("corruption") => SledError::Corruption { + at: Some(DiskPtr::Inline(500)), + bt: (), + }, + Some("bug") => SledError::ReportableBug("BUG".to_owned()), + Some("unsupported(core_structure)") => SledError::Unsupported("cannot remove the core structures".into()), + Some("unsupported") => SledError::Unsupported("Unsupported Operation".to_owned()), + Some("collection_not_found") => SledError::CollectionNotFound(vec![].into()), + _ => panic!("wrong sled error kind {:?}", &kind), } } diff --git a/src/storage/src/tests/in_memory.rs b/src/storage/src/tests/in_memory.rs index 44f3f0dd..18b95310 100644 --- a/src/storage/src/tests/in_memory.rs +++ b/src/storage/src/tests/in_memory.rs @@ -23,48 +23,84 @@ fn storage() -> Storage { } #[rstest::fixture] -fn with_namespace(storage: Storage) -> Storage { - storage.create_schema("namespace").expect("namespace created"); +fn with_schema(storage: Storage, schema_name: &'_ str) -> Storage { + storage + .create_schema(schema_name) + .expect("no io error") + .expect("no platform errors") + .expect("schema created"); storage } #[rstest::fixture] -fn with_object(with_namespace: Storage) -> Storage { - with_namespace - .create_object("namespace", "object_name") +fn with_object(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) -> Storage { + with_schema + .create_object(schema_name, object_name) + .expect("no io error") + .expect("no storage error") .expect("object created"); - with_namespace + with_schema } #[cfg(test)] -mod namespace { +mod schemas { use super::*; #[rstest::rstest] - fn create_namespaces_with_different_names(storage: Storage) { - assert_eq!(storage.create_schema("namespace_1"), Ok(())); - assert_eq!(storage.create_schema("namespace_2"), Ok(())); + fn create_schemas_with_different_names(storage: Storage) { + assert_eq!(storage.create_schema("schema_name_1").expect("no io error"), Ok(Ok(()))); + assert_eq!(storage.create_schema("schema_name_2").expect("no io error"), Ok(Ok(()))); } #[rstest::rstest] - fn drop_namespace(with_namespace: Storage) { - assert_eq!(with_namespace.drop_schema("namespace"), Ok(())); - assert_eq!(with_namespace.create_schema("namespace"), Ok(())); + fn drop_schema(with_schema: Storage, schema_name: &'_ str) { + assert_eq!(with_schema.drop_schema(schema_name).expect("no io error"), Ok(Ok(()))); + assert_eq!(with_schema.create_schema(schema_name).expect("no io error"), Ok(Ok(()))); } #[rstest::rstest] - fn dropping_namespace_drops_objects_in_it(with_namespace: Storage) { - with_namespace - .create_object("namespace", "object_name_1") + fn dropping_schema_drops_objects_in_it(with_schema: Storage, schema_name: &'_ str) { + with_schema + .create_object(schema_name, "object_name_1") + .expect("no io error") + .expect("no storage error") .expect("object created"); - with_namespace - .create_object("namespace", "object_name_2") + with_schema + .create_object(schema_name, "object_name_2") + .expect("no io error") + .expect("no storage error") .expect("object created"); - assert_eq!(with_namespace.drop_schema("namespace"), Ok(())); - assert_eq!(with_namespace.create_schema("namespace"), Ok(())); - assert_eq!(with_namespace.create_object("namespace", "object_name_1"), Ok(())); - assert_eq!(with_namespace.create_object("namespace", "object_name_2"), Ok(())); + assert_eq!(with_schema.drop_schema(schema_name).expect("no io error"), Ok(Ok(()))); + assert_eq!(with_schema.create_schema(schema_name).expect("no io error"), Ok(Ok(()))); + assert_eq!( + with_schema + .create_object(schema_name, "object_name_1") + .expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + with_schema + .create_object(schema_name, "object_name_2") + .expect("no io error"), + Ok(Ok(())) + ); + } + + #[rstest::rstest] + fn create_schema_with_the_same_name(with_schema: Storage, schema_name: &'_ str) { + assert_eq!( + with_schema.create_schema(schema_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaAlreadyExists)) + ) + } + + #[rstest::rstest] + fn drop_schema_that_does_not_exist(storage: Storage, schema_name: &'_ str) { + assert_eq!( + storage.drop_schema(schema_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ) } } @@ -73,17 +109,69 @@ mod create_object { use super::*; #[rstest::rstest] - fn create_objects_with_different_names(with_namespace: Storage) { - assert_eq!(with_namespace.create_object("namespace", "object_name_1"), Ok(())); - assert_eq!(with_namespace.create_object("namespace", "object_name_2"), Ok(())); + fn create_objects_with_different_names(with_schema: Storage, schema_name: &'_ str) { + assert_eq!( + with_schema + .create_object(schema_name, "object_name_1") + .expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + with_schema + .create_object(schema_name, "object_name_2") + .expect("no io error"), + Ok(Ok(())) + ); + } + + #[rstest::rstest] + fn create_objects_with_the_same_name_in_the_same_schema( + with_object: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert_eq!( + with_object + .create_object(schema_name, object_name) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectAlreadyExists)) + ) + } + + #[rstest::rstest] + fn create_objects_in_non_existent_schema(storage: Storage, object_name: &'_ str) { + assert_eq!( + storage + .create_object("does_not_exist", object_name) + .expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ) } #[rstest::rstest] fn create_object_with_the_same_name_in_different_namespaces(storage: Storage) { - storage.create_schema("namespace_1").expect("namespace created"); - storage.create_schema("namespace_2").expect("namespace created"); - assert_eq!(storage.create_object("namespace_1", "object_name"), Ok(())); - assert_eq!(storage.create_object("namespace_2", "object_name"), Ok(())); + storage + .create_schema("schema_name_1") + .expect("no io error") + .expect("no platform errors") + .expect("schema created"); + storage + .create_schema("schema_name_2") + .expect("no io error") + .expect("no platform errors") + .expect("schema created"); + assert_eq!( + storage + .create_object("schema_name_1", "object_name") + .expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + storage + .create_object("schema_name_2", "object_name") + .expect("no io error"), + Ok(Ok(())) + ); } } @@ -92,9 +180,33 @@ mod drop_object { use super::*; #[rstest::rstest] - fn drop_object(with_object: Storage) { - assert_eq!(with_object.drop_object("namespace", "object_name"), Ok(())); - assert_eq!(with_object.create_object("namespace", "object_name"), Ok(())); + fn drop_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert_eq!( + with_object.drop_object(schema_name, object_name).expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + with_object + .create_object(schema_name, object_name) + .expect("no io error"), + Ok(Ok(())) + ); + } + + #[rstest::rstest] + fn drop_object_from_schema_that_does_not_exist(storage: Storage, object_name: &'_ str) { + assert_eq!( + storage.drop_object("does_not_exist", object_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ); + } + + #[rstest::rstest] + fn drop_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert_eq!( + with_schema.drop_object(schema_name, object_name).expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); } } @@ -103,97 +215,206 @@ mod operations_on_object { use super::*; #[rstest::rstest] - fn insert_row_into_object(with_object: Storage) { + fn write_row_into_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { assert_eq!( - with_object.write("namespace", "object_name", as_rows(vec![(1u8, vec!["123"])])), - Ok(1) + with_schema + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) ); + } + #[rstest::rstest] + fn write_row_into_object_in_schema_that_does_not_exist( + storage: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert_eq!( + storage + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ); + } + + #[rstest::rstest] + fn write_read_row_into_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["123"])]).collect()) + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error"), + Ok(Ok(1)) + ); + + assert_eq!( + with_object + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["123"])]) + .map(|ok| ok.expect("no io error")) + .collect()) ); } #[rstest::rstest] - fn insert_many_rows_into_object(with_object: Storage) { + fn write_read_many_rows_into_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object - .write("namespace", "object_name", as_rows(vec![(1u8, vec!["123"])])) + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error") + .expect("no platform error") .expect("values are written"); with_object - .write("namespace", "object_name", as_rows(vec![(2u8, vec!["456"])])) + .write(schema_name, object_name, as_rows(vec![(2u8, vec!["456"])])) + .expect("no io error") + .expect("no platform error") .expect("values are written"); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["123"]), (2u8, vec!["456"])]).collect()) + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["123"]), (2u8, vec!["456"])]) + .map(|ok| ok.expect("no io error")) + .collect()) + ); + } + + #[rstest::rstest] + fn delete_from_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert_eq!( + with_schema + .delete(schema_name, object_name, vec![]) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); + } + + #[rstest::rstest] + fn delete_from_object_that_in_schema_that_does_not_exist( + storage: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert_eq!( + storage.delete(schema_name, object_name, vec![]).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) ); } #[rstest::rstest] - fn delete_some_records_from_object(with_object: Storage) { + fn write_delete_read_records_from_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object .write( - "namespace", - "object_name", + schema_name, + object_name, as_rows(vec![(1u8, vec!["123"]), (2u8, vec!["456"]), (3u8, vec!["789"])]), ) - .expect("write occurred"); + .expect("no io error") + .expect("no platform error") + .expect("values are written"); assert_eq!( - with_object.delete("namespace", "object_name", as_keys(vec![2u8])), - Ok(1) + with_object + .delete(schema_name, object_name, as_keys(vec![2u8])) + .expect("no io error"), + Ok(Ok(1)) ); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["123"]), (3u8, vec!["789"])]).collect()) + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["123"]), (3u8, vec!["789"])]) + .map(|ok| ok.expect("no io error")) + .collect()) ); } #[rstest::rstest] - fn select_all_from_object_with_many_columns(with_object: Storage) { + fn read_from_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert!(matches!( + with_schema.read(schema_name, object_name).expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + )); + } + + #[rstest::rstest] + fn read_from_object_that_in_schema_that_does_not_exist( + storage: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert!(matches!( + storage.read(schema_name, object_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + )); + } + + #[rstest::rstest] + fn read_all_from_object_with_many_columns(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object - .write("namespace", "object_name", as_rows(vec![(1u8, vec!["1", "2", "3"])])) - .expect("write occurred"); + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["1", "2", "3"])])) + .expect("no io error") + .expect("no platform error") + .expect("values are written"); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["1", "2", "3"])]).collect()) + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["1", "2", "3"])]) + .map(|ok| ok.expect("no io error")) + .collect()) ); } #[rstest::rstest] - fn insert_multiple_rows(with_object: Storage) { + fn write_read_multiple_columns(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object .write( - "namespace", - "object_name", + schema_name, + object_name, as_rows(vec![ (1u8, vec!["1", "2", "3"]), (2u8, vec!["4", "5", "6"]), (3u8, vec!["7", "8", "9"]), ]), ) - .expect("write occurred"); + .expect("no io error") + .expect("no platform error") + .expect("values are written"); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), Ok(as_read_cursor(vec![ (1u8, vec!["1", "2", "3"]), (2u8, vec!["4", "5", "6"]), (3u8, vec!["7", "8", "9"]) ]) + .map(|ok| ok.expect("no io error")) .collect()), ); } diff --git a/src/storage/src/tests/mod.rs b/src/storage/src/tests/mod.rs index e85bafb4..61851f31 100644 --- a/src/storage/src/tests/mod.rs +++ b/src/storage/src/tests/mod.rs @@ -19,6 +19,16 @@ mod in_memory; #[cfg(test)] mod persistent; +#[rstest::fixture] +fn schema_name() -> &'static str { + "schema_name" +} + +#[rstest::fixture] +fn object_name() -> &'static str { + "object_name" +} + fn as_rows(items: Vec<(u8, Vec<&'static str>)>) -> Vec { items .into_iter() @@ -51,6 +61,6 @@ fn as_read_cursor(items: Vec<(u8, Vec<&'static str>)>) -> ReadCursor { .map(|s| s.as_bytes()) .collect::>() .join(&b'|'); - Ok((Binary::with_data(k), Binary::with_data(v))) + Ok(Ok((Binary::with_data(k), Binary::with_data(v)))) })) } diff --git a/src/storage/src/tests/persistent.rs b/src/storage/src/tests/persistent.rs index 1482645c..882091f5 100644 --- a/src/storage/src/tests/persistent.rs +++ b/src/storage/src/tests/persistent.rs @@ -25,49 +25,84 @@ fn storage() -> Storage { } #[rstest::fixture] -fn with_namespace(storage: Storage) -> Storage { - storage.create_schema("namespace").expect("namespace created"); +fn with_schema(storage: Storage, schema_name: &'_ str) -> Storage { + storage + .create_schema(schema_name) + .expect("no io error") + .expect("no platform errors") + .expect("schema created"); storage } #[rstest::fixture] -fn with_object(with_namespace: Storage) -> Storage { - with_namespace - .create_object("namespace", "object_name") +fn with_object(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) -> Storage { + with_schema + .create_object(schema_name, object_name) + .expect("no io error") + .expect("no storage error") .expect("object created"); - with_namespace + with_schema } #[cfg(test)] -mod namespace { +mod schemas { use super::*; #[rstest::rstest] - fn create_namespaces_with_different_names(storage: Storage) { - assert_eq!(storage.create_schema("namespace_1"), Ok(())); - assert_eq!(storage.create_schema("namespace_2"), Ok(())); + fn create_schemas_with_different_names(storage: Storage) { + assert_eq!(storage.create_schema("schema_name_1").expect("no io error"), Ok(Ok(()))); + assert_eq!(storage.create_schema("schema_name_2").expect("no io error"), Ok(Ok(()))); } #[rstest::rstest] - fn drop_namespace(with_namespace: Storage) { - assert_eq!(with_namespace.drop_schema("namespace"), Ok(())); - assert_eq!(with_namespace.create_schema("namespace"), Ok(())); + fn drop_schema(with_schema: Storage, schema_name: &'_ str) { + assert_eq!(with_schema.drop_schema(schema_name).expect("no io error"), Ok(Ok(()))); + assert_eq!(with_schema.create_schema(schema_name).expect("no io error"), Ok(Ok(()))); } #[rstest::rstest] - #[ignore] - fn dropping_namespace_drops_objects_in_it(with_namespace: Storage) { - with_namespace - .create_object("namespace", "object_name_1") + fn dropping_schema_drops_objects_in_it(with_schema: Storage, schema_name: &'_ str) { + with_schema + .create_object(schema_name, "object_name_1") + .expect("no io error") + .expect("no storage error") .expect("object created"); - with_namespace - .create_object("namespace", "object_name_2") + with_schema + .create_object(schema_name, "object_name_2") + .expect("no io error") + .expect("no storage error") .expect("object created"); - assert_eq!(with_namespace.drop_schema("namespace"), Ok(())); - assert_eq!(with_namespace.create_schema("namespace"), Ok(())); - assert_eq!(with_namespace.create_object("namespace", "object_name_1"), Ok(())); - assert_eq!(with_namespace.create_object("namespace", "object_name_2"), Ok(())); + assert_eq!(with_schema.drop_schema(schema_name).expect("no io error"), Ok(Ok(()))); + assert_eq!(with_schema.create_schema(schema_name).expect("no io error"), Ok(Ok(()))); + assert_eq!( + with_schema + .create_object(schema_name, "object_name_1") + .expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + with_schema + .create_object(schema_name, "object_name_2") + .expect("no io error"), + Ok(Ok(())) + ); + } + + #[rstest::rstest] + fn create_schema_with_the_same_name(with_schema: Storage, schema_name: &'_ str) { + assert_eq!( + with_schema.create_schema(schema_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaAlreadyExists)) + ) + } + + #[rstest::rstest] + fn drop_schema_that_does_not_exist(storage: Storage, schema_name: &'_ str) { + assert_eq!( + storage.drop_schema(schema_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ) } } @@ -76,17 +111,69 @@ mod create_object { use super::*; #[rstest::rstest] - fn create_objects_with_different_names(with_namespace: Storage) { - assert_eq!(with_namespace.create_object("namespace", "object_name_1"), Ok(())); - assert_eq!(with_namespace.create_object("namespace", "object_name_2"), Ok(())); + fn create_objects_with_different_names(with_schema: Storage, schema_name: &'_ str) { + assert_eq!( + with_schema + .create_object(schema_name, "object_name_1") + .expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + with_schema + .create_object(schema_name, "object_name_2") + .expect("no io error"), + Ok(Ok(())) + ); + } + + #[rstest::rstest] + fn create_objects_with_the_same_name_in_the_same_schema( + with_object: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert_eq!( + with_object + .create_object(schema_name, object_name) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectAlreadyExists)) + ) + } + + #[rstest::rstest] + fn create_objects_in_non_existent_schema(storage: Storage, object_name: &'_ str) { + assert_eq!( + storage + .create_object("does_not_exist", object_name) + .expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ) } #[rstest::rstest] fn create_object_with_the_same_name_in_different_namespaces(storage: Storage) { - storage.create_schema("namespace_1").expect("namespace created"); - storage.create_schema("namespace_2").expect("namespace created"); - assert_eq!(storage.create_object("namespace_1", "object_name"), Ok(())); - assert_eq!(storage.create_object("namespace_2", "object_name"), Ok(())); + storage + .create_schema("schema_name_1") + .expect("no io error") + .expect("no platform errors") + .expect("schema created"); + storage + .create_schema("schema_name_2") + .expect("no io error") + .expect("no platform errors") + .expect("schema created"); + assert_eq!( + storage + .create_object("schema_name_1", "object_name") + .expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + storage + .create_object("schema_name_2", "object_name") + .expect("no io error"), + Ok(Ok(())) + ); } } @@ -95,9 +182,33 @@ mod drop_object { use super::*; #[rstest::rstest] - fn drop_object(with_object: Storage) { - assert_eq!(with_object.drop_object("namespace", "object_name"), Ok(())); - assert_eq!(with_object.create_object("namespace", "object_name"), Ok(())); + fn drop_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert_eq!( + with_object.drop_object(schema_name, object_name).expect("no io error"), + Ok(Ok(())) + ); + assert_eq!( + with_object + .create_object(schema_name, object_name) + .expect("no io error"), + Ok(Ok(())) + ); + } + + #[rstest::rstest] + fn drop_object_from_schema_that_does_not_exist(storage: Storage, object_name: &'_ str) { + assert_eq!( + storage.drop_object("does_not_exist", object_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ); + } + + #[rstest::rstest] + fn drop_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert_eq!( + with_schema.drop_object(schema_name, object_name).expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); } } @@ -106,97 +217,206 @@ mod operations_on_object { use super::*; #[rstest::rstest] - fn insert_row_into_object(with_object: Storage) { + fn write_row_into_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { assert_eq!( - with_object.write("namespace", "object_name", as_rows(vec![(1u8, vec!["123"])])), - Ok(1) + with_schema + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) ); + } + #[rstest::rstest] + fn write_row_into_object_in_schema_that_does_not_exist( + storage: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert_eq!( + storage + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + ); + } + + #[rstest::rstest] + fn write_read_row_into_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["123"])]).collect()) + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error"), + Ok(Ok(1)) + ); + + assert_eq!( + with_object + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["123"])]) + .map(|ok| ok.expect("no io error")) + .collect()) ); } #[rstest::rstest] - fn insert_many_rows_into_object(with_object: Storage) { + fn write_read_many_rows_into_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object - .write("namespace", "object_name", as_rows(vec![(1u8, vec!["123"])])) + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["123"])])) + .expect("no io error") + .expect("no platform error") .expect("values are written"); with_object - .write("namespace", "object_name", as_rows(vec![(2u8, vec!["456"])])) + .write(schema_name, object_name, as_rows(vec![(2u8, vec!["456"])])) + .expect("no io error") + .expect("no platform error") .expect("values are written"); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["123"]), (2u8, vec!["456"])]).collect()) + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["123"]), (2u8, vec!["456"])]) + .map(|ok| ok.expect("no io error")) + .collect()) + ); + } + + #[rstest::rstest] + fn delete_from_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert_eq!( + with_schema + .delete(schema_name, object_name, vec![]) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); + } + + #[rstest::rstest] + fn delete_from_object_that_in_schema_that_does_not_exist( + storage: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert_eq!( + storage.delete(schema_name, object_name, vec![]).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) ); } #[rstest::rstest] - fn delete_some_records_from_object(with_object: Storage) { + fn write_delete_read_records_from_object(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object .write( - "namespace", - "object_name", + schema_name, + object_name, as_rows(vec![(1u8, vec!["123"]), (2u8, vec!["456"]), (3u8, vec!["789"])]), ) - .expect("write occurred"); + .expect("no io error") + .expect("no platform error") + .expect("values are written"); assert_eq!( - with_object.delete("namespace", "object_name", as_keys(vec![2u8])), - Ok(1) + with_object + .delete(schema_name, object_name, as_keys(vec![2u8])) + .expect("no io error"), + Ok(Ok(1)) ); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["123"]), (3u8, vec!["789"])]).collect()) + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["123"]), (3u8, vec!["789"])]) + .map(|ok| ok.expect("no io error")) + .collect()) ); } #[rstest::rstest] - fn select_all_from_object_with_many_columns(with_object: Storage) { + fn read_from_object_that_does_not_exist(with_schema: Storage, schema_name: &'_ str, object_name: &'_ str) { + assert!(matches!( + with_schema.read(schema_name, object_name).expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + )); + } + + #[rstest::rstest] + fn read_from_object_that_in_schema_that_does_not_exist( + storage: Storage, + schema_name: &'_ str, + object_name: &'_ str, + ) { + assert!(matches!( + storage.read(schema_name, object_name).expect("no io error"), + Ok(Err(DefinitionError::SchemaDoesNotExist)) + )); + } + + #[rstest::rstest] + fn read_all_from_object_with_many_columns(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object - .write("namespace", "object_name", as_rows(vec![(1u8, vec!["1", "2", "3"])])) - .expect("write occurred"); + .write(schema_name, object_name, as_rows(vec![(1u8, vec!["1", "2", "3"])])) + .expect("no io error") + .expect("no platform error") + .expect("values are written"); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), - Ok(as_read_cursor(vec![(1u8, vec!["1", "2", "3"])]).collect()) + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), + Ok(as_read_cursor(vec![(1u8, vec!["1", "2", "3"])]) + .map(|ok| ok.expect("no io error")) + .collect()) ); } #[rstest::rstest] - fn insert_multiple_rows(with_object: Storage) { + fn write_read_multiple_columns(with_object: Storage, schema_name: &'_ str, object_name: &'_ str) { with_object .write( - "namespace", - "object_name", + schema_name, + object_name, as_rows(vec![ (1u8, vec!["1", "2", "3"]), (2u8, vec!["4", "5", "6"]), (3u8, vec!["7", "8", "9"]), ]), ) - .expect("write occurred"); + .expect("no io error") + .expect("no platform error") + .expect("values are written"); assert_eq!( with_object - .read("namespace", "object_name") - .map(|iter| iter.collect::>>()), + .read(schema_name, object_name) + .expect("no io error") + .expect("no platform error") + .map(|iter| iter + .map(|ok| ok.expect("no io error")) + .collect::>>()), Ok(as_read_cursor(vec![ (1u8, vec!["1", "2", "3"]), (2u8, vec!["4", "5", "6"]), (3u8, vec!["7", "8", "9"]) ]) + .map(|ok| ok.expect("no io error")) .collect()), ); } diff --git a/src/storage/tests/failpoints/drop_db.rs b/src/storage/tests/failpoints/drop_db.rs new file mode 100644 index 00000000..bd34abc9 --- /dev/null +++ b/src/storage/tests/failpoints/drop_db.rs @@ -0,0 +1,107 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use storage::{Database, PersistentDatabase, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + let storage = PersistentDatabase::new(root_path.into_path()); + storage + .create_schema("schema_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create schema"); + storage + .create_object("schema_name", "object_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create object"); + storage +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-db", "return(io)").unwrap(); + + assert_eq!( + database.drop_schema("schema_name").expect("no io error"), + Err(StorageError::CascadeIo(vec!["object_name".to_owned()])) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-db", "return(corruption)").unwrap(); + + assert_eq!( + database.drop_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-db", "return(bug)").unwrap(); + + assert_eq!( + database.drop_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation_core_structure(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-db", "return(unsupported(core_structure))").unwrap(); + + assert_eq!(database.drop_schema("schema_name").expect("no io error"), Ok(Ok(()))); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-db", "return(unsupported)").unwrap(); + + assert_eq!( + database.drop_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-db", "return(collection_not_found)").unwrap(); + + assert_eq!( + database.drop_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} diff --git a/src/storage/tests/failpoints/drop_tree.rs b/src/storage/tests/failpoints/drop_tree.rs new file mode 100644 index 00000000..7a611d87 --- /dev/null +++ b/src/storage/tests/failpoints/drop_tree.rs @@ -0,0 +1,95 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use storage::{Database, DefinitionError, PersistentDatabase, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + let storage = PersistentDatabase::new(root_path.into_path()); + storage + .create_schema("schema_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create schema"); + storage + .create_object("schema_name", "object_name") + .expect("no io error") + .expect("no storage error") + .expect("to create object"); + storage +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-tree", "return(io)").unwrap(); + + assert!(matches!(database.drop_object("schema_name", "object_name"), Err(_))); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-tree", "return(corruption)").unwrap(); + + assert_eq!( + database.drop_object("schema_name", "object_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-tree", "return(bug)").unwrap(); + + assert_eq!( + database.drop_object("schema_name", "object_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-tree", "return(unsupported)").unwrap(); + + assert_eq!( + database.drop_object("schema_name", "object_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-drop-tree", "return(collection_not_found)").unwrap(); + + assert_eq!( + database.drop_object("schema_name", "object_name").expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); + + scenario.teardown(); +} diff --git a/src/storage/tests/failpoints/flush_tree.rs b/src/storage/tests/failpoints/flush_tree.rs new file mode 100644 index 00000000..5142d346 --- /dev/null +++ b/src/storage/tests/failpoints/flush_tree.rs @@ -0,0 +1,127 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use representation::Binary; +use storage::{Database, DefinitionError, PersistentDatabase, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + let storage = PersistentDatabase::new(root_path.into_path()); + storage + .create_schema("schema_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create schema"); + storage + .create_object("schema_name", "object_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create object"); + storage +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(io)").unwrap(); + + assert!(matches!( + database.write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ), + Err(_) + )); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(corruption)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(bug)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(unsupported)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(collection_not_found)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); + + scenario.teardown(); +} diff --git a/src/storage/tests/failpoints/insert_into_tree.rs b/src/storage/tests/failpoints/insert_into_tree.rs new file mode 100644 index 00000000..5142d346 --- /dev/null +++ b/src/storage/tests/failpoints/insert_into_tree.rs @@ -0,0 +1,127 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use representation::Binary; +use storage::{Database, DefinitionError, PersistentDatabase, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + let storage = PersistentDatabase::new(root_path.into_path()); + storage + .create_schema("schema_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create schema"); + storage + .create_object("schema_name", "object_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create object"); + storage +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(io)").unwrap(); + + assert!(matches!( + database.write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ), + Err(_) + )); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(corruption)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(bug)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(unsupported)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-insert-into-tree", "return(collection_not_found)").unwrap(); + + assert_eq!( + database + .write( + "schema_name", + "object_name", + vec![(Binary::with_data(vec![]), Binary::with_data(vec![]))] + ) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); + + scenario.teardown(); +} diff --git a/src/storage/tests/failpoints/iterate_over_tree.rs b/src/storage/tests/failpoints/iterate_over_tree.rs new file mode 100644 index 00000000..53bc4ec1 --- /dev/null +++ b/src/storage/tests/failpoints/iterate_over_tree.rs @@ -0,0 +1,128 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use storage::{Database, PersistentDatabase, RowResult, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + let storage = PersistentDatabase::new(root_path.into_path()); + storage + .create_schema("schema_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create schema"); + storage + .create_object("schema_name", "object_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create object"); + storage +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-iterate-over-tree", "return(io)").unwrap(); + + assert!(matches!( + database + .read("schema_name", "object_name") + .expect("no io error") + .expect("no platform error") + .expect("read data from tree") + .collect::>() + .as_slice(), + &[Err(_)] + )); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-iterate-over-tree", "return(corruption)").unwrap(); + + assert!(matches!( + database + .read("schema_name", "object_name") + .expect("no io error") + .expect("no platform error") + .expect("read data from tree") + .collect::>() + .as_slice(), + &[Ok(Err(StorageError::Storage))] + )); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-iterate-over-tree", "return(bug)").unwrap(); + + assert!(matches!( + database + .read("schema_name", "object_name") + .expect("no io error") + .expect("no platform error") + .expect("read data from tree") + .collect::>() + .as_slice(), + &[Ok(Err(StorageError::Storage))] + )); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-iterate-over-tree", "return(unsupported)").unwrap(); + + assert!(matches!( + database + .read("schema_name", "object_name") + .expect("no io error") + .expect("no platform error") + .expect("read data from tree") + .collect::>() + .as_slice(), + &[Ok(Err(StorageError::Storage))] + )); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-iterate-over-tree", "return(collection_not_found)").unwrap(); + + assert!(matches!( + database + .read("schema_name", "object_name") + .expect("no io error") + .expect("no platform error") + .expect("read data from tree") + .collect::>() + .as_slice(), + &[Ok(Err(StorageError::Storage))] + )); + + scenario.teardown(); +} diff --git a/src/storage/tests/failpoints/open_db.rs b/src/storage/tests/failpoints/open_db.rs new file mode 100644 index 00000000..e065c2b4 --- /dev/null +++ b/src/storage/tests/failpoints/open_db.rs @@ -0,0 +1,84 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use storage::{Database, PersistentDatabase, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + PersistentDatabase::new(root_path.into_path()) +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-db", "return(io)").unwrap(); + + assert!(matches!(database.create_schema("schema_name"), Err(_))); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-db", "return(corruption)").unwrap(); + + assert_eq!( + database.create_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-db", "return(bug)").unwrap(); + + assert_eq!( + database.create_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-db", "return(unsupported)").unwrap(); + + assert_eq!( + database.create_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-db", "return(collection_not_found)").unwrap(); + + assert_eq!( + database.create_schema("schema_name").expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} diff --git a/src/storage/tests/failpoints/open_tree.rs b/src/storage/tests/failpoints/open_tree.rs new file mode 100644 index 00000000..b702aaa1 --- /dev/null +++ b/src/storage/tests/failpoints/open_tree.rs @@ -0,0 +1,98 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use storage::{Database, DefinitionError, PersistentDatabase, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + let storage = PersistentDatabase::new(root_path.into_path()); + storage + .create_schema("schema_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create schema"); + storage +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-tree", "return(io)").unwrap(); + + assert!(matches!(database.create_object("schema_name", "object_name"), Err(_))); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-tree", "return(corruption)").unwrap(); + + assert_eq!( + database + .create_object("schema_name", "object_name") + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-tree", "return(bug)").unwrap(); + + assert_eq!( + database + .create_object("schema_name", "object_name") + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-tree", "return(unsupported)").unwrap(); + + assert_eq!( + database + .create_object("schema_name", "object_name") + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-open-tree", "return(collection_not_found)").unwrap(); + + assert_eq!( + database + .create_object("schema_name", "object_name") + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); + + scenario.teardown(); +} diff --git a/src/storage/tests/failpoints/remove_from_tree.rs b/src/storage/tests/failpoints/remove_from_tree.rs new file mode 100644 index 00000000..b24b83ad --- /dev/null +++ b/src/storage/tests/failpoints/remove_from_tree.rs @@ -0,0 +1,107 @@ +// Copyright 2020 Alex Dukhno +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fail::FailScenario; +use representation::Binary; +use storage::{Database, DefinitionError, PersistentDatabase, StorageError}; + +#[rstest::fixture] +fn scenario() -> FailScenario<'static> { + FailScenario::setup() +} + +#[rstest::fixture] +fn database() -> PersistentDatabase { + let root_path = tempfile::tempdir().expect("to create temporary folder"); + let storage = PersistentDatabase::new(root_path.into_path()); + storage + .create_schema("schema_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create schema"); + storage + .create_object("schema_name", "object_name") + .expect("no io error") + .expect("no platform errors") + .expect("to create object"); + storage +} + +#[rstest::rstest] +fn io_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-remove-from-tree", "return(io)").unwrap(); + + assert!(matches!( + database.delete("schema_name", "object_name", vec![Binary::with_data(vec![])]), + Err(_) + )); + + scenario.teardown(); +} + +#[rstest::rstest] +fn corruption_error(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-remove-from-tree", "return(corruption)").unwrap(); + + assert_eq!( + database + .delete("schema_name", "object_name", vec![Binary::with_data(vec![])]) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn reportable_bug(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-remove-from-tree", "return(bug)").unwrap(); + + assert_eq!( + database + .delete("schema_name", "object_name", vec![Binary::with_data(vec![])]) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn unsupported_operation(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-remove-from-tree", "return(unsupported)").unwrap(); + + assert_eq!( + database + .delete("schema_name", "object_name", vec![Binary::with_data(vec![])]) + .expect("no io error"), + Err(StorageError::Storage) + ); + + scenario.teardown(); +} + +#[rstest::rstest] +fn collection_not_found(database: PersistentDatabase, scenario: FailScenario) { + fail::cfg("sled-fail-to-remove-from-tree", "return(collection_not_found)").unwrap(); + + assert_eq!( + database + .delete("schema_name", "object_name", vec![Binary::with_data(vec![])]) + .expect("no io error"), + Ok(Err(DefinitionError::ObjectDoesNotExist)) + ); + + scenario.teardown(); +}