From 624ef1dc5c0ca923f540d3c07159b92954f328e8 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 16 Mar 2023 11:35:00 +0800 Subject: [PATCH 1/3] feat: don't allow create table which failed when open --- analytic_engine/src/instance/create.rs | 12 +++++++++++- analytic_engine/src/instance/engine.rs | 8 ++++++++ analytic_engine/src/instance/open.rs | 20 +++++++++++++------- analytic_engine/src/space.rs | 16 +++++++++++++++- analytic_engine/src/table/data.rs | 13 ++----------- 5 files changed, 49 insertions(+), 20 deletions(-) diff --git a/analytic_engine/src/instance/create.rs b/analytic_engine/src/instance/create.rs index cd5d53e37c..8cb0ef9524 100644 --- a/analytic_engine/src/instance/create.rs +++ b/analytic_engine/src/instance/create.rs @@ -12,7 +12,10 @@ use tokio::sync::oneshot; use crate::{ instance::{ - engine::{CreateTableData, InvalidOptions, OperateByWriteWorker, Result, WriteManifest}, + engine::{ + CreateOpenFailedTable, CreateTableData, InvalidOptions, OperateByWriteWorker, Result, + WriteManifest, + }, write_worker::{self, CreateTableCommand, WorkerLocal}, Instance, }, @@ -31,6 +34,13 @@ impl Instance { ) -> Result { info!("Instance create table, request:{:?}", request); + if space.is_open_failed_table(&request.table_name) { + return CreateOpenFailedTable { + table: request.table_name, + } + .fail(); + } + let mut table_opts = table_options::merge_table_options_for_create(&request.options, &self.table_opts) .box_err() diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index 1285c37e9e..959c1454cc 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -212,6 +212,13 @@ pub enum Error { table: String, source: GenericError, }, + + #[snafu(display( + "{} open failed and can not be created again.\nBacktrace:\n{}", + table, + backtrace, + ))] + CreateOpenFailedTable { table: String, backtrace: Backtrace }, } define_result!(Error); @@ -241,6 +248,7 @@ impl From for table_engine::engine::Error { | Error::FlushTable { .. } | Error::StoreVersionEdit { .. } | Error::EncodePayloads { .. } + | Error::CreateOpenFailedTable { .. } | Error::DoManifestSnapshot { .. } => Self::Unexpected { source: Box::new(err), }, diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 2357baf12d..8f686703ac 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -189,13 +189,19 @@ impl Instance { ..Default::default() }; - self.recover_table_from_wal( - worker_local, - table_data.clone(), - replay_batch_size, - &read_ctx, - ) - .await?; + if let Err(e) = self + .recover_table_from_wal( + worker_local, + table_data.clone(), + replay_batch_size, + &read_ctx, + ) + .await + { + error!("Recovery table failed, table_data:{table_data:?}"); + space.insert_open_failed_table(table_data.name.to_string()); + return Err(e); + } space.insert_table(table_data.clone()); Ok(Some(table_data)) diff --git a/analytic_engine/src/space.rs b/analytic_engine/src/space.rs index 18128bf800..a5f635fff3 100644 --- a/analytic_engine/src/space.rs +++ b/analytic_engine/src/space.rs @@ -92,6 +92,11 @@ pub struct Space { /// lock table_datas: RwLock, + /// If table open failed, request of this table is not allowed, otherwise + /// schema may become inconsistent. + // TODO: engine should provide a repair method to fix those failed tables. + open_failed_tables: RwLock>, + /// Write workers pub write_group: WriteGroup, /// Space memtable memory usage collector @@ -111,8 +116,9 @@ impl Space { Self { id, context, - table_datas: RwLock::new(TableDataSet::new()), write_group, + table_datas: Default::default(), + open_failed_tables: Default::default(), mem_usage_collector: Arc::new(MemUsageCollector::with_parent(engine_mem_collector)), write_buffer_size, } @@ -159,6 +165,14 @@ impl Space { assert!(success); } + pub(crate) fn insert_open_failed_table(&self, table_name: String) { + self.open_failed_tables.write().unwrap().push(table_name) + } + + pub(crate) fn is_open_failed_table(&self, table_name: &String) -> bool { + self.open_failed_tables.read().unwrap().contains(table_name) + } + /// Find table under this space by table name pub fn find_table(&self, table_name: &str) -> Option { self.table_datas.read().unwrap().find_table(table_name) diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index b1a6a5a08c..9896b60a36 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -522,7 +522,7 @@ pub struct TableLocation { pub type TableDataRef = Arc; /// Manages TableDataRef -#[derive(Debug)] +#[derive(Debug, Default)] pub struct TableDataSet { /// Name to table data table_datas: HashMap, @@ -533,10 +533,7 @@ pub struct TableDataSet { impl TableDataSet { /// Create an empty TableDataSet pub fn new() -> Self { - Self { - table_datas: HashMap::new(), - id_to_tables: HashMap::new(), - } + Self::default() } /// Insert if absent, if successfully inserted, return true and return @@ -596,12 +593,6 @@ impl TableDataSet { } } -impl Default for TableDataSet { - fn default() -> Self { - Self::new() - } -} - #[cfg(test)] pub mod tests { use std::sync::Arc; From d328918764f078d28c1210362d7b5823beabcac6 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Thu, 16 Mar 2023 16:52:23 +0800 Subject: [PATCH 2/3] Update analytic_engine/src/instance/engine.rs Co-authored-by: chunshao.rcs --- analytic_engine/src/instance/engine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index 959c1454cc..5256e7c920 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -214,7 +214,7 @@ pub enum Error { }, #[snafu(display( - "{} open failed and can not be created again.\nBacktrace:\n{}", + "Table open failed and can not be created again, table:{}.\nBacktrace:\n{}", table, backtrace, ))] From 191f7c2138e22fe74e64e4ab4abce988d708f8a5 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 16 Mar 2023 16:58:30 +0800 Subject: [PATCH 3/3] fix CR --- analytic_engine/src/instance/open.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 8f686703ac..e66fbfd77b 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -189,19 +189,18 @@ impl Instance { ..Default::default() }; - if let Err(e) = self - .recover_table_from_wal( - worker_local, - table_data.clone(), - replay_batch_size, - &read_ctx, - ) - .await - { - error!("Recovery table failed, table_data:{table_data:?}"); + self.recover_table_from_wal( + worker_local, + table_data.clone(), + replay_batch_size, + &read_ctx, + ) + .await + .map_err(|e| { + error!("Recovery table from wal failed, table_data:{table_data:?}, err:{e}"); space.insert_open_failed_table(table_data.name.to_string()); - return Err(e); - } + e + })?; space.insert_table(table_data.clone()); Ok(Some(table_data))